home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / pyshared / apport / crashdb_impl / memory.py < prev   
Encoding:
Python Source  |  2009-04-06  |  22.8 KB  |  643 lines

  1. '''Simple in-memory CrashDatabase implementation, mainly useful for testing.
  2.  
  3. Copyright (C) 2007 Canonical Ltd.
  4. Author: Martin Pitt <martin.pitt@ubuntu.com>
  5.  
  6. This program is free software; you can redistribute it and/or modify it
  7. under the terms of the GNU General Public License as published by the
  8. Free Software Foundation; either version 2 of the License, or (at your
  9. option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
  10. the full text of the license.
  11. '''
  12.  
  13. import copy, time, os, unittest
  14. import apport.crashdb
  15. import apport
  16.  
  17. class CrashDatabase(apport.crashdb.CrashDatabase):
  18.     '''Simple implementation of crash database interface which keeps everything
  19.     in memory.
  20.     
  21.     This is mainly useful for testing and debugging.'''
  22.  
  23.     def __init__(self, auth_file, bugpattern_baseurl, options):
  24.         '''Initialize crash database connection.
  25.         
  26.         This class does not support bug patterns and authentication.'''
  27.  
  28.         apport.crashdb.CrashDatabase.__init__(self, auth_file,
  29.             bugpattern_baseurl, options)
  30.  
  31.         self.reports = [] # list of dictionaries with keys: report, fixed_version, dup_of, comment
  32.         self.unretraced = set()
  33.         self.dup_unchecked = set()
  34.  
  35.         if 'dummy_data' in options:
  36.             self.add_dummy_data()
  37.  
  38.     def upload(self, report, progress_callback = None):
  39.         '''Store the report and return a handle number (starting from 0).
  40.         
  41.         This does not support (nor need) progress callbacks.'''
  42.  
  43.         self.reports.append({'report': report, 'fixed_version': None, 'dup_of':
  44.             None, 'comment:': ''})
  45.         id = len(self.reports)-1
  46.         if 'Traceback' in report:
  47.             self.dup_unchecked.add(id)
  48.         else:
  49.             self.unretraced.add(id)
  50.         return id
  51.  
  52.     def get_comment_url(self, report, handle):
  53.         '''Return http://<sourcepackage>.bugs.example.com/<handle> for package bugs
  54.         or http://bugs.example.com/<handle> for reports without a SourcePackage.'''
  55.  
  56.         if report.has_key('SourcePackage'):
  57.             return 'http://%s.bugs.example.com/%i' % (report['SourcePackage'],
  58.                 handle)
  59.         else:
  60.             return 'http://bugs.example.com/%i' % handle
  61.  
  62.     def download(self, id):
  63.         '''Download the problem report from given ID and return a Report.'''
  64.  
  65.         return self.reports[id]['report']
  66.  
  67.     def update(self, id, report, comment = ''):
  68.         '''Update the given report ID with the retraced results from the report
  69.         (Stacktrace, ThreadStacktrace, StacktraceTop; also Disassembly if
  70.         desired) and an optional comment.'''
  71.  
  72.         self.reports[id]['report'] = report
  73.         self.reports[id]['comment'] = comment
  74.  
  75.     def get_distro_release(self, id):
  76.         '''Get 'DistroRelease: <release>' from the given report ID and return
  77.         it.'''
  78.  
  79.         return self.reports[id]['report']['DistroRelease']
  80.  
  81.     def get_unfixed(self):
  82.         '''Return an ID set of all crashes which are not yet fixed.
  83.  
  84.         The list must not contain bugs which were rejected or duplicate.
  85.         
  86.         This function should make sure that the returned list is correct. If
  87.         there are any errors with connecting to the crash database, it should
  88.         raise an exception (preferably IOError).'''
  89.  
  90.         result = set()
  91.         for i in xrange(len(self.reports)):
  92.             if self.reports[i]['dup_of'] is None and \
  93.                 self.reports[i]['fixed_version'] == None:
  94.                 result.add(i)
  95.  
  96.         return result
  97.  
  98.     def get_fixed_version(self, id):
  99.         '''Return the package version that fixes a given crash.
  100.  
  101.         Return None if the crash is not yet fixed, or an empty string if the
  102.         crash is fixed, but it cannot be determined by which version. Return
  103.         'invalid' if the crash report got invalidated, such as closed a
  104.         duplicate or rejected.
  105.  
  106.         This function should make sure that the returned result is correct. If
  107.         there are any errors with connecting to the crash database, it should
  108.         raise an exception (preferably IOError).'''
  109.  
  110.         try:
  111.             if self.reports[id]['dup_of'] != None:
  112.                 return 'invalid'
  113.             return self.reports[id]['fixed_version']
  114.         except IndexError:
  115.             return 'invalid'
  116.  
  117.     def duplicate_of(self, id):
  118.         '''Return master ID for a duplicate bug.
  119.  
  120.         If the bug is not a duplicate, return None.
  121.         '''
  122.         return self.reports[id]['dup_of']
  123.  
  124.     def close_duplicate(self, id, master):
  125.         '''Mark a crash id as duplicate of given master ID.
  126.         
  127.         If master is None, id gets un-duplicated.
  128.         '''
  129.         self.reports[id]['dup_of'] = master
  130.  
  131.     def mark_regression(self, id, master):
  132.         '''Mark a crash id as reintroducing an earlier crash which is
  133.         already marked as fixed (having ID 'master').'''
  134.  
  135.         assert self.reports[master]['fixed_version'] != None
  136.         self.reports[id]['comment'] = 'regression, already fixed in #%i' % master
  137.  
  138.     def _mark_dup_checked(self, id, report):
  139.         '''Mark crash id as checked for being a duplicate.'''
  140.  
  141.         try:
  142.             self.dup_unchecked.remove(id)
  143.         except KeyError:
  144.             pass # happens when trying to check for dup twice
  145.  
  146.     def mark_retraced(self, id):
  147.         '''Mark crash id as retraced.'''
  148.  
  149.         self.unretraced.remove(id)
  150.  
  151.     def get_unretraced(self):
  152.         '''Return an ID set of all crashes which have not been retraced yet and
  153.         which happened on the current host architecture.'''
  154.  
  155.         return self.unretraced
  156.  
  157.     def get_dup_unchecked(self):
  158.         '''Return an ID set of all crashes which have not been checked for
  159.         being a duplicate.
  160.  
  161.         This is mainly useful for crashes of scripting languages such as
  162.         Python, since they do not need to be retraced. It should not return
  163.         bugs that are covered by get_unretraced().'''
  164.  
  165.         return self.dup_unchecked
  166.  
  167.     def latest_id(self):
  168.         '''Return the ID of the most recently filed report.'''
  169.  
  170.         return len(self.reports)-1
  171.  
  172.     def add_dummy_data(self):
  173.         '''Add some dummy crash reports.
  174.  
  175.         This is mostly useful for test suites.'''
  176.  
  177.         # signal crash with source package and complete stack trace
  178.         r = apport.Report()
  179.         r['Package'] = 'libfoo1 1.2-3'
  180.         r['SourcePackage'] = 'foo'
  181.         r['DistroRelease'] = 'FooLinux Pi/2'
  182.         r['Signal'] = '11'
  183.         r['ExecutablePath'] = '/bin/crash'
  184.  
  185.         r['StacktraceTop'] = '''foo_bar (x=1) at crash.c:28
  186. d01 (x=1) at crash.c:29
  187. raise () from /lib/libpthread.so.0
  188. <signal handler called>
  189. __frob (x=1) at crash.c:30'''
  190.         self.upload(r)
  191.  
  192.         # duplicate of above crash (slightly different arguments and
  193.         # package version)
  194.         r = apport.Report()
  195.         r['Package'] = 'libfoo1 1.2-4'
  196.         r['SourcePackage'] = 'foo'
  197.         r['DistroRelease'] = 'Testux 1.0'
  198.         r['Signal'] = '11'
  199.         r['ExecutablePath'] = '/bin/crash'
  200.  
  201.         r['StacktraceTop'] = '''foo_bar (x=2) at crash.c:28
  202. d01 (x=3) at crash.c:29
  203. raise () from /lib/libpthread.so.0
  204. <signal handler called>
  205. __frob (x=4) at crash.c:30'''
  206.         self.upload(r)
  207.  
  208.         # unrelated signal crash
  209.         r = apport.Report()
  210.         r['Package'] = 'bar 42-4'
  211.         r['SourcePackage'] = 'bar'
  212.         r['DistroRelease'] = 'Testux 1.0'
  213.         r['Signal'] = '11'
  214.         r['ExecutablePath'] = '/usr/bin/broken'
  215.  
  216.         r['StacktraceTop'] = '''h (p=0x0) at crash.c:25
  217. g (x=1, y=42) at crash.c:26
  218. f (x=1) at crash.c:27
  219. e (x=1) at crash.c:28
  220. d (x=1) at crash.c:29'''
  221.         self.upload(r)
  222.  
  223.         # Python crash
  224.         r = apport.Report()
  225.         r['Package'] = 'python-goo 3epsilon1'
  226.         r['SourcePackage'] = 'pygoo'
  227.         r['DistroRelease'] = 'Testux 2.2'
  228.         r['ExecutablePath'] = '/usr/bin/pygoo'
  229.         r['Traceback'] = '''Traceback (most recent call last):
  230. File "test.py", line 7, in <module>
  231. print _f(5)
  232. File "test.py", line 5, in _f
  233. return g_foo00(x+1)
  234. File "test.py", line 2, in g_foo00
  235. return x/0
  236. ZeroDivisionError: integer division or modulo by zero'''
  237.         self.upload(r)
  238.  
  239.         # mark the python crash as fixed
  240.         self.reports[3]['fixed_version'] = '4.1'
  241.  
  242.         # Python crash reoccurs in a later version (regression)
  243.         r = apport.Report()
  244.         r['Package'] = 'python-goo 5'
  245.         r['SourcePackage'] = 'pygoo'
  246.         r['DistroRelease'] = 'Testux 2.2'
  247.         r['ExecutablePath'] = '/usr/bin/pygoo'
  248.         r['Traceback'] = '''Traceback (most recent call last):
  249. File "test.py", line 7, in <module>
  250. print _f(5)
  251. File "test.py", line 5, in _f
  252. return g_foo00(x+1)
  253. File "test.py", line 2, in g_foo00
  254. return x/0
  255. ZeroDivisionError: integer division or modulo by zero'''
  256.         self.upload(r)
  257.  
  258. #
  259. # Unit test (this also tests the dup detection API from apport/crashdb.py)
  260. #
  261.  
  262. class _MemoryCrashDBTest(unittest.TestCase):
  263.     def setUp(self):
  264.         self.crashes = CrashDatabase(None, None, {'dummy_data': '1'})
  265.  
  266.         self.assertEqual(self.crashes.get_comment_url(self.crashes.download(0),
  267.             0), 'http://foo.bugs.example.com/0')
  268.  
  269.         # test-suite internal consistency check: Python signatures are
  270.         # indeed equal and exist
  271.         assert self.crashes.download(3).crash_signature(), \
  272.             'test-suite internal check: Python crash sigs exist'
  273.         self.assertEqual(self.crashes.download(3).crash_signature(),
  274.             self.crashes.download(4).crash_signature())
  275.  
  276.         # we should have 5 crashes
  277.         self.assertEqual(self.crashes.latest_id(), 4)
  278.  
  279.     def test_no_dummy_data(self):
  280.         '''No dummy data is added by default'''
  281.  
  282.         self.crashes = CrashDatabase(None, None, {})
  283.         self.assertEqual(self.crashes.latest_id(), -1)
  284.         self.assertRaises(IndexError, self.crashes.download, 0)
  285.  
  286.     def test_retrace_markers(self):
  287.         '''Bookkeeping in retraced and dupchecked bugs'''
  288.  
  289.         self.assertEqual(self.crashes.get_unretraced(), set([0, 1, 2]))
  290.         self.assertEqual(self.crashes.get_dup_unchecked(), set([3, 4]))
  291.  
  292.     #
  293.     # Test memory.py implementation
  294.     #
  295.  
  296.     def test_submit(self):
  297.         '''Crash uploading and downloading'''
  298.  
  299.         # setUp() already checks upload() and get_comment_url()
  300.         r = self.crashes.download(0)
  301.         self.assertEqual(r['SourcePackage'], 'foo')
  302.         self.assertEqual(r['Package'], 'libfoo1 1.2-3')
  303.         self.assertEqual(self.crashes.reports[0]['dup_of'], None)
  304.  
  305.         self.assertRaises(IndexError, self.crashes.download, 5)
  306.  
  307.     def test_update(self):
  308.         '''update()'''
  309.  
  310.         r = apport.Report()
  311.         r['Package'] = 'new'
  312.  
  313.         self.crashes.update(1, r, 'muhaha')
  314.         self.assertEqual(self.crashes.download(1)['Package'], 'new')
  315.         self.assertEqual(self.crashes.reports[1]['comment'], 'muhaha')
  316.  
  317.         self.assertRaises(IndexError, self.crashes.update, 5, None)
  318.  
  319.     def test_get_distro_release(self):
  320.         '''get_distro_release()'''
  321.  
  322.         self.assertEqual(self.crashes.get_distro_release(0), 'FooLinux Pi/2')
  323.  
  324.     def test_status(self):
  325.         '''get_unfixed(), get_fixed_version(), duplicate_of(), close_duplicate()'''
  326.  
  327.         self.assertEqual(self.crashes.get_unfixed(), set([0, 1, 2, 4]))
  328.         self.assertEqual(self.crashes.get_fixed_version(0), None)
  329.         self.assertEqual(self.crashes.get_fixed_version(1), None)
  330.         self.assertEqual(self.crashes.get_fixed_version(3), '4.1')
  331.  
  332.         self.assertEqual(self.crashes.duplicate_of(0), None)
  333.         self.assertEqual(self.crashes.duplicate_of(1), None)
  334.         self.crashes.close_duplicate(1, 0)
  335.         self.assertEqual(self.crashes.duplicate_of(0), None)
  336.         self.assertEqual(self.crashes.duplicate_of(1), 0)
  337.  
  338.         self.assertEqual(self.crashes.get_unfixed(), set([0, 2, 4]))
  339.         self.assertEqual(self.crashes.get_fixed_version(1), 'invalid')
  340.  
  341.         self.assertEqual(self.crashes.get_fixed_version(99), 'invalid')
  342.  
  343.     def test_mark_regression(self):
  344.         '''mark_regression()'''
  345.  
  346.         self.crashes.mark_regression(4, 3)
  347.         self.assertEqual(self.crashes.reports[4]['comment'], 
  348.             'regression, already fixed in #3')
  349.  
  350.     #
  351.     # Test crash duplication detection API of crashdb.py
  352.     #
  353.  
  354.     def test_duplicate_db_fixed(self):
  355.         '''duplicate_db_fixed()'''
  356.  
  357.         self.crashes.init_duplicate_db(':memory:')
  358.         self.assertEqual(self.crashes.check_duplicate(0), None)
  359.  
  360.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  361.             {self.crashes.download(0).crash_signature(): (0, None)})
  362.  
  363.         self.crashes.duplicate_db_fixed(0, '42')
  364.  
  365.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  366.             {self.crashes.download(0).crash_signature(): (0, '42')})
  367.  
  368.     def test_duplicate_db_remove(self):
  369.         '''duplicate_db_remove()'''
  370.  
  371.         self.crashes.init_duplicate_db(':memory:')
  372.         self.assertEqual(self.crashes.check_duplicate(0), None)
  373.  
  374.         self.crashes.duplicate_db_remove(0)
  375.  
  376.         self.assertEqual(self.crashes._duplicate_db_dump(), {})
  377.  
  378.     def test_check_duplicate(self):
  379.         '''check_duplicate()'''
  380.  
  381.         # db not yet initialized
  382.         self.assertRaises(AssertionError, self.crashes.check_duplicate, 0,
  383.             self.crashes.download(0))
  384.         self.assertRaises(AssertionError, self.crashes.check_duplicate, 0)
  385.  
  386.         self.crashes.init_duplicate_db(':memory:')
  387.  
  388.         self.assertEqual(self.crashes._duplicate_db_dump(), {})
  389.  
  390.         # ID#0 -> no dup
  391.         self.assertEqual(self.crashes.check_duplicate(0), None)
  392.  
  393.         # ID#1 -> dup of #0
  394.         self.assertEqual(self.crashes.check_duplicate(1), (0, None))
  395.  
  396.         # ID#2 is unrelated, no dup
  397.         self.assertEqual(self.crashes.check_duplicate(2), None)
  398.  
  399.         # ID#3: no dup, master of ID#4
  400.         self.assertEqual(self.crashes.check_duplicate(3), None)
  401.         # manually poke the fixed version into the dup db; this will
  402.         # normally be done by duplicate_db_consolidate(), but let's test
  403.         # this separately
  404.         self.crashes.duplicate_db_fixed(3, '4.1')
  405.  
  406.         # check current states of real world; ID#1 is a dup and thus does
  407.         # not appear
  408.         self.assertEqual(self.crashes.get_unfixed(), set([0, 2, 4]))
  409.  
  410.         # ID#4: dup of ID#3, and a regression (fixed in 4.1, happened in 5)
  411.         self.assertEqual(self.crashes.check_duplicate(4), (3, '4.1'))
  412.  
  413.         # check crash states again; ID#4 is a regression of ID#3 in version
  414.         # 5, so it's not a real duplicate
  415.         self.assertEqual(self.crashes.get_unfixed(), set([0, 2, 4]))
  416.  
  417.         # check DB consistency; ID#1 is a dup and does not appear
  418.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  419.             {self.crashes.download(0).crash_signature(): (0, None),
  420.              self.crashes.download(2).crash_signature(): (2, None),
  421.              self.crashes.download(3).crash_signature(): (3, '4.1'),
  422.              self.crashes.download(4).crash_signature(): (4, None)})
  423.  
  424.         # add two more  Python crash dups and verify that they are dup'ed
  425.         # to the correct ID
  426.         r = copy.copy(self.crashes.download(3))
  427.         self.assertEqual(self.crashes.get_comment_url(r, self.crashes.upload(r)),
  428.             'http://pygoo.bugs.example.com/5')
  429.         self.assertEqual(self.crashes.check_duplicate(5), (3, '4.1'))
  430.  
  431.         r = copy.copy(self.crashes.download(3))
  432.         r['Package'] = 'python-goo 5.1'
  433.         self.assertEqual(self.crashes.get_comment_url(r, self.crashes.upload(r)),
  434.             'http://pygoo.bugs.example.com/6')
  435.         self.assertEqual(self.crashes.check_duplicate(6), (4, None))
  436.  
  437.         # check with unknown fixed version
  438.         self.crashes.reports[3]['fixed_version'] = ''
  439.         self.crashes.duplicate_db_fixed(3, '')
  440.  
  441.         r = copy.copy(self.crashes.download(3))
  442.         r['Package'] = 'python-goo 5.1'
  443.         self.assertEqual(self.crashes.get_comment_url(r, self.crashes.upload(r)),
  444.             'http://pygoo.bugs.example.com/7')
  445.         self.assertEqual(self.crashes.check_duplicate(7), (3, ''))
  446.  
  447.         # final consistency check
  448.         self.assertEqual(self.crashes.get_unfixed(), set([0, 2, 4]))
  449.  
  450.     def test_check_duplicate_report_arg(self):
  451.         '''check_duplicate() with explicitly passing report'''
  452.  
  453.         self.crashes.init_duplicate_db(':memory:')
  454.  
  455.         # ID#0 -> no dup
  456.         self.assertEqual(self.crashes.check_duplicate(0), None)
  457.  
  458.         # ID#2 is unrelated, no dup
  459.         self.assertEqual(self.crashes.check_duplicate(2), None)
  460.  
  461.         # report from ID#1 is a dup of #0
  462.         self.assertEqual(self.crashes.check_duplicate(2,
  463.             self.crashes.download(1)), (0, None))
  464.  
  465.     # FIXME: fix locking and enable this test
  466.     def __test_duplicate_db_consolidate_race(self):
  467.         '''Two parallel instances of duplicate_db_consolidate()
  468.         
  469.         One should immediately throw a 'locked' exception.
  470.         '''
  471.         # create db with 1000 unfixed crashes
  472.         self.crashes = CrashDatabase(None, None, {})
  473.         self.crashes.init_duplicate_db(':memory:')
  474.  
  475.         for bug in xrange(1000):
  476.             r = apport.Report()
  477.             r['Package'] = 'python-goo 3'
  478.             r['SourcePackage'] = 'pygoo'
  479.             r['ExecutablePath'] = '/usr/bin/pygoo'
  480.             r['Traceback'] = '''Traceback (most recent call last):
  481. File "test.py", line 7, in <module>
  482. print _f(5)
  483. File "test.py", line 5, in _f
  484. return g_foo00(x+1)
  485. File "test.py", line 2, in g_foo00
  486. return x/0
  487. ZeroDivisionError%i: integer division or modulo by zero''' % bug
  488.             self.assertEqual(self.crashes.get_comment_url(r, self.crashes.upload(r)),
  489.                 'http://pygoo.bugs.example.com/%i' % bug)
  490.             self.crashes.check_duplicate(bug)
  491.             # mark crash as fixed now
  492.             self.crashes.reports[bug]['fixed_version'] = str(bug)
  493.  
  494.         locked_exceptions = 0
  495.  
  496.         # run two consolidations in parallel; the child returns 0 when
  497.         # consolidation finished properly, 42 on 'db locked' exception, or
  498.         # 1 on another exception
  499.         pid = os.fork() 
  500.         if pid == 0:
  501.             try:
  502.                 self.crashes.duplicate_db_consolidate()
  503.             except Exception, e:
  504.                 if 'database is locked' in str(e):
  505.                     os._exit(42)
  506.                 else:
  507.                     raise
  508.             os._exit(0)
  509.  
  510.         try:
  511.             self.crashes.duplicate_db_consolidate()
  512.         except Exception, e:
  513.             if 'database is locked' in str(e):
  514.                 locked_exceptions += 1
  515.             else:
  516.                 raise
  517.  
  518.         # wait on child, examine status
  519.         status = os.wait()[1]
  520.         self.assert_(os.WIFEXITED(status))
  521.         status = os.WEXITSTATUS(status)
  522.         if status == 42:
  523.             locked_exceptions += 1
  524.         else:
  525.             self.assertEqual(status, 0)
  526.  
  527.         self.assertEqual(locked_exceptions, 1)
  528.  
  529.         # check consistency
  530.         for (sig, (bug, version)) in self.crashes._duplicate_db_dump().iteritems():
  531.             self.assertEqual(str(bug), version)
  532.  
  533.     def test_duplicate_db_consolidate(self):
  534.         '''duplicate_db_consolidate()'''
  535.  
  536.         self.crashes.init_duplicate_db(':memory:')
  537.         self.assertEqual(self.crashes.check_duplicate(0,
  538.             self.crashes.download(0)), None)
  539.         self.assertEqual(self.crashes.check_duplicate(2,
  540.             self.crashes.download(2)), None)
  541.         self.assertEqual(self.crashes.check_duplicate(3,
  542.             self.crashes.download(3)), None)
  543.  
  544.         # manually kill #2
  545.         self.crashes.close_duplicate(2, 0)
  546.         self.assertEqual(self.crashes.get_unfixed(), set([0, 1, 4]))
  547.  
  548.         # no fixed version for #3 yet, and obsolete #2 is still there
  549.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  550.             {self.crashes.download(0).crash_signature(): (0, None),
  551.              self.crashes.download(2).crash_signature(): (2, None),
  552.              self.crashes.download(3).crash_signature(): (3, None)})
  553.  
  554.         self.crashes.duplicate_db_consolidate()
  555.  
  556.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  557.             {self.crashes.download(0).crash_signature(): (0, None),
  558.              self.crashes.download(3).crash_signature(): (3, '4.1')})
  559.  
  560.     def test_duplicate_db_needs_consolidation(self):
  561.         '''duplicate_db_needs_consolidation()'''
  562.  
  563.         self.crashes.init_duplicate_db(':memory:')
  564.  
  565.         # a fresh and empty db does not need consolidation
  566.         self.failIf(self.crashes.duplicate_db_needs_consolidation())
  567.  
  568.         time.sleep(1.1)
  569.         # for an one-day interval we do not need consolidation
  570.         self.failIf(self.crashes.duplicate_db_needs_consolidation())
  571.         # neither for a ten second one (check timezone offset errors)
  572.         self.failIf(self.crashes.duplicate_db_needs_consolidation(10))
  573.         # but for an one second interval
  574.         self.assert_(self.crashes.duplicate_db_needs_consolidation(1))
  575.  
  576.         self.crashes.duplicate_db_consolidate()
  577.  
  578.         self.failIf(self.crashes.duplicate_db_needs_consolidation(1))
  579.  
  580.     def test_change_master_id(self):
  581.         '''duplicate_db_change_master_id()'''
  582.  
  583.         # db not yet initialized
  584.         self.assertRaises(AssertionError, self.crashes.check_duplicate, 0)
  585.  
  586.         self.crashes.init_duplicate_db(':memory:')
  587.  
  588.         self.assertEqual(self.crashes.check_duplicate(0), None)
  589.         self.assertEqual(self.crashes.check_duplicate(2), None)
  590.  
  591.         # check DB consistency
  592.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  593.             {self.crashes.download(0).crash_signature(): (0, None),
  594.              self.crashes.download(2).crash_signature(): (2, None)})
  595.  
  596.         # invalid ID (raising KeyError is *hard*, so it's not done)
  597.         self.crashes.duplicate_db_change_master_id(5, 99)
  598.  
  599.         # nevertheless, this should not change the DB
  600.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  601.             {self.crashes.download(0).crash_signature(): (0, None),
  602.              self.crashes.download(2).crash_signature(): (2, None)})
  603.  
  604.         # valid ID
  605.         self.crashes.duplicate_db_change_master_id(2, 99)
  606.  
  607.         # check DB consistency
  608.         self.assertEqual(self.crashes._duplicate_db_dump(), 
  609.             {self.crashes.download(0).crash_signature(): (0, None),
  610.              self.crashes.download(2).crash_signature(): (99, None)})
  611.  
  612.     def test_db_corruption(self):
  613.         '''Detection of DB file corruption'''
  614.  
  615.         try:
  616.             (fd, db) = tempfile.mkstemp()
  617.             os.close(fd)
  618.             self.crashes.init_duplicate_db(db)
  619.             self.assertEqual(self.crashes.check_duplicate(0), None)
  620.             self.assertEqual(self.crashes._duplicate_db_dump(), 
  621.                 {self.crashes.download(0).crash_signature(): (0, None)})
  622.             self.crashes.duplicate_db_fixed(0, '42')
  623.             self.assertEqual(self.crashes._duplicate_db_dump(), 
  624.                 {self.crashes.download(0).crash_signature(): (0, '42')})
  625.  
  626.             self.failIf(self.crashes.duplicate_db_needs_consolidation())
  627.             del self.crashes
  628.  
  629.             # damage file
  630.             f = open(db, 'r+')
  631.             f.truncate(os.path.getsize(db)*2/3)
  632.             f.close()
  633.  
  634.             self.crashes = CrashDatabase(None, None, {})
  635.             self.assertRaises(SystemError, self.crashes.init_duplicate_db, db)
  636.  
  637.         finally:
  638.             os.unlink(db)
  639.  
  640. if __name__ == '__main__':
  641.     import tempfile
  642.     unittest.main()
  643.